1   /**
2    * Copyright 2014 Netflix, Inc.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5    * use this file except in compliance with the License. You may obtain a copy of
6    * the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.junit.Assert.assertEquals;
19  import static org.mockito.Matchers.any;
20  import static org.mockito.Mockito.*;
21  
22  import java.io.IOException;
23  import java.util.Collections;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.atomic.*;
26  
27  import org.junit.Test;
28  import org.mockito.InOrder;
29  
30  import rx.*;
31  import rx.Observable.OnSubscribe;
32  import rx.exceptions.TestException;
33  import rx.functions.*;
34  import rx.observers.TestSubscriber;
35  import rx.subjects.PublishSubject;
36  
37  public class OperatorRetryWithPredicateTest {
38      Func2<Integer, Throwable, Boolean> retryTwice = new Func2<Integer, Throwable, Boolean>() {
39          @Override
40          public Boolean call(Integer t1, Throwable t2) {
41              return t1 <= 2;
42          }
43      };
44      Func2<Integer, Throwable, Boolean> retry5 = new Func2<Integer, Throwable, Boolean>() {
45          @Override
46          public Boolean call(Integer t1, Throwable t2) {
47              return t1 <= 5;
48          }
49      };
50      Func2<Integer, Throwable, Boolean> retryOnTestException = new Func2<Integer, Throwable, Boolean>() {
51          @Override
52          public Boolean call(Integer t1, Throwable t2) {
53              return t2 instanceof IOException;
54          }
55      };
56      @Test
57      public void testWithNothingToRetry() {
58          Observable<Integer> source = Observable.range(0, 3);
59          
60          @SuppressWarnings("unchecked")
61          Observer<Integer> o = mock(Observer.class);
62          InOrder inOrder = inOrder(o);
63          
64          source.retry(retryTwice).subscribe(o);
65          
66          inOrder.verify(o).onNext(0);
67          inOrder.verify(o).onNext(1);
68          inOrder.verify(o).onNext(2);
69          inOrder.verify(o).onCompleted();
70          verify(o, never()).onError(any(Throwable.class));
71      }
72      @Test
73      public void testRetryTwice() {
74          Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
75              int count;
76              @Override
77              public void call(Subscriber<? super Integer> t1) {
78                  count++;
79                  t1.onNext(0);
80                  t1.onNext(1);
81                  if (count == 1) {
82                      t1.onError(new TestException());
83                      return;
84                  }
85                  t1.onNext(2);
86                  t1.onNext(3);
87                  t1.onCompleted();
88              }
89          });
90          
91          @SuppressWarnings("unchecked")
92          Observer<Integer> o = mock(Observer.class);
93          InOrder inOrder = inOrder(o);
94          
95          source.retry(retryTwice).subscribe(o);
96  
97          inOrder.verify(o).onNext(0);
98          inOrder.verify(o).onNext(1);
99          inOrder.verify(o).onNext(0);
100         inOrder.verify(o).onNext(1);
101         inOrder.verify(o).onNext(2);
102         inOrder.verify(o).onNext(3);
103         inOrder.verify(o).onCompleted();
104         verify(o, never()).onError(any(Throwable.class));
105         
106     }
107     @Test
108     public void testRetryTwiceAndGiveUp() {
109         Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
110             @Override
111             public void call(Subscriber<? super Integer> t1) {
112                 t1.onNext(0);
113                 t1.onNext(1);
114                 t1.onError(new TestException());
115             }
116         });
117         
118         @SuppressWarnings("unchecked")
119         Observer<Integer> o = mock(Observer.class);
120         InOrder inOrder = inOrder(o);
121         
122         source.retry(retryTwice).subscribe(o);
123 
124         inOrder.verify(o).onNext(0);
125         inOrder.verify(o).onNext(1);
126         inOrder.verify(o).onNext(0);
127         inOrder.verify(o).onNext(1);
128         inOrder.verify(o).onNext(0);
129         inOrder.verify(o).onNext(1);
130         inOrder.verify(o).onError(any(TestException.class));
131         verify(o, never()).onCompleted();
132         
133     }
134     @Test
135     public void testRetryOnSpecificException() {
136         Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
137             int count;
138             @Override
139             public void call(Subscriber<? super Integer> t1) {
140                 count++;
141                 t1.onNext(0);
142                 t1.onNext(1);
143                 if (count == 1) {
144                     t1.onError(new IOException());
145                     return;
146                 }
147                 t1.onNext(2);
148                 t1.onNext(3);
149                 t1.onCompleted();
150             }
151         });
152         
153         @SuppressWarnings("unchecked")
154         Observer<Integer> o = mock(Observer.class);
155         InOrder inOrder = inOrder(o);
156         
157         source.retry(retryOnTestException).subscribe(o);
158 
159         inOrder.verify(o).onNext(0);
160         inOrder.verify(o).onNext(1);
161         inOrder.verify(o).onNext(0);
162         inOrder.verify(o).onNext(1);
163         inOrder.verify(o).onNext(2);
164         inOrder.verify(o).onNext(3);
165         inOrder.verify(o).onCompleted();
166         verify(o, never()).onError(any(Throwable.class));
167     }
168     @Test
169     public void testRetryOnSpecificExceptionAndNotOther() {
170         final IOException ioe = new IOException();
171         final TestException te = new TestException();
172         Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
173             int count;
174             @Override
175             public void call(Subscriber<? super Integer> t1) {
176                 count++;
177                 t1.onNext(0);
178                 t1.onNext(1);
179                 if (count == 1) {
180                     t1.onError(ioe);
181                     return;
182                 }
183                 t1.onNext(2);
184                 t1.onNext(3);
185                 t1.onError(te);
186             }
187         });
188         
189         @SuppressWarnings("unchecked")
190         Observer<Integer> o = mock(Observer.class);
191         InOrder inOrder = inOrder(o);
192         
193         source.retry(retryOnTestException).subscribe(o);
194 
195         inOrder.verify(o).onNext(0);
196         inOrder.verify(o).onNext(1);
197         inOrder.verify(o).onNext(0);
198         inOrder.verify(o).onNext(1);
199         inOrder.verify(o).onNext(2);
200         inOrder.verify(o).onNext(3);
201         inOrder.verify(o).onError(te);
202         verify(o, never()).onError(ioe);
203         verify(o, never()).onCompleted();
204     }
205     
206     @Test
207     public void testUnsubscribeFromRetry() {
208         PublishSubject<Integer> subject = PublishSubject.create();
209         final AtomicInteger count = new AtomicInteger(0);
210         Subscription sub = subject.retry(retryTwice).subscribe(new Action1<Integer>() {
211             @Override
212             public void call(Integer n) {
213                 count.incrementAndGet();
214             }
215         });
216         subject.onNext(1);
217         sub.unsubscribe();
218         subject.onNext(2);
219         assertEquals(1, count.get());
220     }
221     
222     @Test(timeout = 10000)
223     public void testUnsubscribeAfterError() {
224 
225         @SuppressWarnings("unchecked")
226         Observer<Long> observer = mock(Observer.class);
227 
228         // Observable that always fails after 100ms
229         OperatorRetryTest.SlowObservable so = new OperatorRetryTest.SlowObservable(100, 0);
230         Observable<Long> o = Observable
231                 .create(so)
232                 .retry(retry5);
233 
234         OperatorRetryTest.AsyncObserver<Long> async = new OperatorRetryTest.AsyncObserver<Long>(observer);
235 
236         o.subscribe(async);
237 
238         async.await();
239 
240         InOrder inOrder = inOrder(observer);
241         // Should fail once
242         inOrder.verify(observer, times(1)).onError(any(Throwable.class));
243         inOrder.verify(observer, never()).onCompleted();
244 
245         assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
246         assertEquals("Only 1 active subscription", 1, so.maxActive.get());
247     }
248 
249     @Test(timeout = 10000)
250     public void testTimeoutWithRetry() {
251 
252         @SuppressWarnings("unchecked")
253         Observer<Long> observer = mock(Observer.class);
254 
255         // Observable that sends every 100ms (timeout fails instead)
256         OperatorRetryTest.SlowObservable so = new OperatorRetryTest.SlowObservable(100, 10);
257         Observable<Long> o = Observable
258                 .create(so)
259                 .timeout(80, TimeUnit.MILLISECONDS)
260                 .retry(retry5);
261 
262         OperatorRetryTest.AsyncObserver<Long> async = new OperatorRetryTest.AsyncObserver<Long>(observer);
263 
264         o.subscribe(async);
265 
266         async.await();
267 
268         InOrder inOrder = inOrder(observer);
269         // Should fail once
270         inOrder.verify(observer, times(1)).onError(any(Throwable.class));
271         inOrder.verify(observer, never()).onCompleted();
272 
273         assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
274     }
275     
276     @Test
277     public void testIssue2826() {
278         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
279         final RuntimeException e = new RuntimeException("You shall not pass");
280         final AtomicInteger c = new AtomicInteger();
281         Observable.just(1).map(new Func1<Integer, Integer>() {
282             @Override
283             public Integer call(Integer t1) {
284                 c.incrementAndGet();
285                 throw e;
286             }
287         }).retry(retry5).subscribe(ts);
288 
289         ts.assertTerminalEvent();
290         assertEquals(6, c.get());
291         assertEquals(Collections.singletonList(e), ts.getOnErrorEvents());
292     }
293     @Test
294     public void testJustAndRetry() throws Exception {
295         final AtomicBoolean throwException = new AtomicBoolean(true);
296         int value = Observable.just(1).map(new Func1<Integer, Integer>() {
297             @Override
298             public Integer call(Integer t1) {
299                 if (throwException.compareAndSet(true, false)) {
300                     throw new TestException();
301                 }
302                 return t1;
303             }
304         }).retry(1).toBlocking().single();
305 
306         assertEquals(1, value);
307     }
308 }